{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "802ec6a1-b608-40a5-931f-5e7dfb2d7046",
   "metadata": {},
   "source": [
    "# Step 1: Real-Time Feature Computation\n",
    "\n",
    "This notebook is part of a demo showcasing a real-time fraud detection pipeline, utilizing Feldera for feature computation and Hopsworks as the feature store.\n",
    "\n",
    "![Real-time feature engineering pipeline using Feldera and Hosworks](./architecture.png)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4d98b4fc-f56a-442a-86c2-c94e73b552c1",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from feldera import FelderaClient, SQLContext, SQLSchema"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6415e0f3-41b5-4a0d-9f61-437cbd878d59",
   "metadata": {},
   "source": [
    "## Set Hopsworks API KEY"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a6dd4a65-8a50-4da7-bd73-8721ffdb25b2",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from ipython_secrets import *\n",
    "KEY = get_secret('HOPSWORKS_API_KEY')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b532c124-cb6b-4779-8f26-9acaa509a89b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Connect to the Feldera API\n",
    "\n",
    "# Use Feldera online sandbox\n",
    "# client = FelderaClient(\"https://try.feldera.com\", api_key = get_secret('FELDERA_API_KEY'))\n",
    "\n",
    "# Use local Feldera instance\n",
    "client = FelderaClient(\"http://localhost:8080\")\n",
    "\n",
    "sql = SQLContext(\"hopsworks_kafka\", client).get_or_create()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b2bc94df-de0a-4faa-93b1-c98cf2948b9e",
   "metadata": {},
   "source": [
    "## Step 1.1. Create Hopsworks feature groups"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d2037b50-de20-43f4-b8cc-c67196f920f7",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import hopsworks\n",
    "from hsfs import engine\n",
    "from hsfs.feature import Feature\n",
    "import json\n",
    "\n",
    "# Connect to Hopsworks.\n",
    "project = hopsworks.login(host=\"c.app.hopsworks.ai\", api_key_value=KEY)\n",
    "\n",
    "kafka_api = project.get_kafka_api()\n",
    "KAFKA_OUTPUT_TOPICS = [\"transactions_fraud_streaming_fg_\" + str(project.id), \"transactions_aggs_fraud_streaming_fg_\" + str(project.id)]\n",
    "\n",
    "fs = project.get_feature_store()\n",
    "\n",
    "# Create feature groups to store Feldera outputs.\n",
    "\n",
    "# COMBINED - features that extend credit card transaction records with attributes extracted from the card\n",
    "# holder's profile, such as their age at the time of the transaction and the number of days until the credit card expires.\n",
    "combined_fg = fs.get_or_create_feature_group(\n",
    "        name=KAFKA_OUTPUT_TOPICS[0],\n",
    "        primary_key=[\"cc_num\"],\n",
    "        online_enabled=True,\n",
    "        version=1,\n",
    "        topic_name=KAFKA_OUTPUT_TOPICS[0],\n",
    "        event_time=\"date_time\",\n",
    "        stream=True,\n",
    "        features=[\n",
    "            Feature(\"tid\", type=\"string\"),\n",
    "            Feature(\"date_time\", type=\"timestamp\"),\n",
    "            Feature(\"cc_num\", type=\"string\"),\n",
    "            Feature(\"category\", type=\"string\"),\n",
    "            Feature(\"amount\", type=\"double\"),\n",
    "            Feature(\"latitude\", type=\"double\"),\n",
    "            Feature(\"longitude\", type=\"double\"),\n",
    "            Feature(\"city\", type=\"string\"),\n",
    "            Feature(\"country\", type=\"string\"),\n",
    "            Feature(\"fraud_label\", type=\"int\"),\n",
    "            Feature(\"age_at_transaction\", type=\"int\"),\n",
    "            Feature(\"days_until_card_expires\", type=\"int\"),\n",
    "            Feature(\"cc_expiration_date\", type=\"timestamp\"),\n",
    "        ],\n",
    ")\n",
    "\n",
    "try:\n",
    "    combined_fg.save()\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "if KAFKA_OUTPUT_TOPICS[0] not in [topic.name for topic in kafka_api.get_topics()]:\n",
    "    kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[0], json.loads(combined_fg.avro_schema))\n",
    "    kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[0], KAFKA_OUTPUT_TOPICS[0], 1, replicas=1, partitions=1)\n",
    "\n",
    "# WINDOWED - frequency of transactions and other metrics in the span of a few hours, modeled as hopping window aggregates.\n",
    "windowed_fg = fs.get_or_create_feature_group(\n",
    "    name=str(KAFKA_OUTPUT_TOPICS[1]),\n",
    "    primary_key=[\"cc_num\"],\n",
    "    online_enabled=True,\n",
    "    version=1,\n",
    "    topic_name=KAFKA_OUTPUT_TOPICS[1],\n",
    "    event_time=\"date_time\",\n",
    "    stream=True,\n",
    "    features=[\n",
    "        Feature(\"avg_amt\", type=\"double\"),\n",
    "        Feature(\"trans\", type=\"bigint\"),\n",
    "        Feature(\"stddev_amt\", type=\"double\"),\n",
    "        Feature(\"date_time\", type=\"timestamp\"),\n",
    "        Feature(\"cc_num\", type=\"string\"),\n",
    "    ],\n",
    ")\n",
    "\n",
    "try:\n",
    "    windowed_fg.save()\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "if KAFKA_OUTPUT_TOPICS[1] not in [topic.name for topic in kafka_api.get_topics()]:\n",
    "    kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[1], json.loads(windowed_fg.avro_schema))\n",
    "    kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[1], KAFKA_OUTPUT_TOPICS[1], 1, replicas=1, partitions=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "176854c0-2b0a-4e94-a40b-9039f12e302d",
   "metadata": {},
   "source": [
    "## Step 1.2. Create Feldera pipeline\n",
    "\n",
    "We build a Feldera pipeline to transform raw transaction and profile data into features. In Feldera, feature groups are modeled as SQL views. Thus, we create a SQL program with two input tables (TRANSACTIONS and PROFILES), and two output views, one for each feature group.\n",
    "\n",
    "![Feldera pipeline](./feldera_pipeline.png)\n",
    "\n",
    "### Declare input tables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2fb8e951-69e3-4ddc-8e61-7c8ddbba3be1",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Register input tables with Feldera.\n",
    "sql.register_table(\"TRANSACTIONS\", SQLSchema({\n",
    "    \"tid\": \"STRING\",\n",
    "    \"date_time\": \"TIMESTAMP\",\n",
    "    \"cc_num\": \"STRING\",\n",
    "    \"category\": \"STRING\",\n",
    "    \"amount\": \"DOUBLE\",\n",
    "    \"latitude\": \"DOUBLE\",\n",
    "    \"longitude\": \"DOUBLE\",\n",
    "    \"city\": \"STRING\",\n",
    "    \"country\": \"STRING\",\n",
    "    \"fraud_label\": \"INT\",\n",
    "}))\n",
    "\n",
    "sql.register_table(\"PROFILES\", SQLSchema({\n",
    "    \"cc_num\": \"STRING\",\n",
    "    \"cc_provider\": \"STRING\",\n",
    "    \"cc_type\": \"STRING\",\n",
    "    \"cc_expiration_date\": \"STRING\",\n",
    "    \"name\": \"STRING\",\n",
    "    \"mail\": \"STRING\",\n",
    "    \"birthdate\": \"TIMESTAMP\",\n",
    "    \"age\": \"INT\",\n",
    "    \"city\": \"STRING\",\n",
    "    \"country_of_residence\": \"STRING\",\n",
    "}))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "78d39686-0ae7-42aa-b94b-f2ce33f61f40",
   "metadata": {},
   "source": [
    "### Declare feature views\n",
    "\n",
    "We create two kinds of features: \n",
    "\n",
    "#### 1. **Features that join data from different sources**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "660d6462-e3df-4e37-9355-1329e8d51359",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Convert credit card expiration date from MM/YY formatted string to a TIMESTAMP,\n",
    "# so that we can perform computations on it.\n",
    "sql.register_local_view(\n",
    "    \"cc_expiration\",\n",
    "    f\"\"\"\n",
    "    SELECT\n",
    "        cc_num,\n",
    "        CAST(\n",
    "            CONCAT(\n",
    "                '20',\n",
    "                SUBSTRING(\n",
    "                    cc_expiration_date,\n",
    "                    4,\n",
    "                    2\n",
    "                ),\n",
    "                '-',\n",
    "                SUBSTRING(\n",
    "                    cc_expiration_date,\n",
    "                    1,\n",
    "                    2\n",
    "                ),\n",
    "                '-01 00:00:00'\n",
    "            ) AS TIMESTAMP\n",
    "        ) AS cc_expiration_date\n",
    "    FROM PROFILES\"\"\"\n",
    ")\n",
    "\n",
    "# Compute the age of the individual during the transaction, and the number of days until the\n",
    "# credit card expires from `PROFILES` and `TRANSACTIONS` tables.\n",
    "sql.register_view(\n",
    "    \"combined\",\n",
    "    f\"\"\"\n",
    "    SELECT\n",
    "        T1.*,\n",
    "        T2.cc_expiration_date,\n",
    "        TIMESTAMPDIFF(YEAR, T3.birthdate, T1.date_time) age_at_transaction,\n",
    "        TIMESTAMPDIFF(DAY, T1.date_time, T2.cc_expiration_date) days_until_card_expires\n",
    "    FROM\n",
    "        TRANSACTIONS T1 JOIN cc_expiration T2\n",
    "        ON\n",
    "            T1.cc_num = T2.cc_num\n",
    "        JOIN PROFILES T3\n",
    "    ON\n",
    "        T1.cc_num = T3.cc_num\"\"\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "99e25bb3-49a2-4260-a129-abde1ef8a121",
   "metadata": {},
   "source": [
    "#### 2. **Features that aggregate data over time** "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d2fce2ad-65cd-4d9d-8364-283bd47dbb1b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Create a 4 hour hopping window aggregation from data from transactions table\n",
    "sql.register_local_view(\n",
    "    \"hop\",\n",
    "    f\"\"\"\n",
    "    SELECT * FROM TABLE(HOP(TABLE TRANSACTIONS, DESCRIPTOR(date_time), INTERVAL 4 HOURS, INTERVAL 1 HOURS))\"\"\"\n",
    ")\n",
    "\n",
    "# Compute aggregates from it\n",
    "sql.register_local_view(\n",
    "    \"agg\",\n",
    "    \"\"\"\n",
    "    SELECT\n",
    "        AVG(amount) AS avg_amt,\n",
    "        STDDEV(amount) as stddev_amt,\n",
    "        COUNT(cc_num) as trans,\n",
    "        ARRAY_AGG(date_time) as moments,\n",
    "        cc_num\n",
    "    FROM hop\n",
    "    GROUP BY cc_num, window_start\"\"\"\n",
    ")\n",
    "\n",
    "\n",
    "# Final output view\n",
    "sql.register_view(\n",
    "    \"windowed\",\n",
    "    \"\"\"\n",
    "    SELECT\n",
    "        avg_amt,\n",
    "        trans,\n",
    "        COALESCE(stddev_amt, 0) as stddev_amt,\n",
    "        date_time,\n",
    "        cc_num\n",
    "    FROM agg CROSS JOIN UNNEST(moments) as date_time\"\"\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "19d9d834-6e5b-434e-b158-e2e6888e7839",
   "metadata": {},
   "source": [
    "### Connect Kafka sources and sinks\n",
    "\n",
    "We use the Kafka topic created during the data prep step as the input for the TRANSACTIONS table. The output views are also connected to the Hopsworks feature store via Kafka. Hopsworks ingests data from Kafka using the Avro format, so we configure Feldera Kafka connectors with Avro schemas generated by Hopsworks for each feature group."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4421bcb6-7cc6-49ea-a99e-65f21d683619",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Get Hopsworks public Kafka servers.\n",
    "kafka_config = engine.get_instance()._get_kafka_config(fs.id, {})\n",
    "\n",
    "from feldera.formats import JSONFormat, JSONUpdateFormat\n",
    "\n",
    "in_fmt = JSONFormat().with_update_format(JSONUpdateFormat.Raw).with_array(False)\n",
    "\n",
    "KAFKA_INPUT_TOPIC = \"transactions_topic_\" + str(project.id)\n",
    "source_config = kafka_config | {\"topics\": [KAFKA_INPUT_TOPIC], \"auto.offset.reset\": \"earliest\"}\n",
    "\n",
    "# Connect the Kafka topic created during data prep to the TRANSACTIONS table.\n",
    "sql.connect_source_kafka(\"TRANSACTIONS\", \"hopsworks_transactions_kafka_in\", source_config, in_fmt)\n",
    "\n",
    "def create_sink_config(kafka_config: dict, fg, project_id):\n",
    "    return kafka_config | {\n",
    "        \"topic\": fg.topic_name,\n",
    "        \"auto.offset.reset\": \"earliest\",\n",
    "        \"headers\": [\n",
    "            {\n",
    "                'key': 'projectId',\n",
    "                'value': str(project_id),\n",
    "            },\n",
    "            {\n",
    "                'key': 'featureGroupId',\n",
    "                'value': str(fg.id),\n",
    "            },\n",
    "            {\n",
    "                'key': 'subjectId',\n",
    "                'value': str(fg.subject[\"id\"]),\n",
    "            },\n",
    "        ]\n",
    "    }\n",
    "\n",
    "from feldera.formats import AvroFormat\n",
    "\n",
    "# Set the output format to use the avro schema from the feature group\n",
    "trans_out_fmt = AvroFormat().with_schema(combined_fg.avro_schema).with_skip_schema_id(True)\n",
    "sql.connect_sink_kafka(\"combined\", \"hopsworks_combined_kafka_out\", create_sink_config(kafka_config, combined_fg, project.id), trans_out_fmt)\n",
    "\n",
    "win_out_fmt = AvroFormat().with_schema(windowed_fg.avro_schema).with_skip_schema_id(True)\n",
    "sql.connect_sink_kafka(\"windowed\", \"hopsworks_windowed_kafka_out\", create_sink_config(kafka_config, windowed_fg, project.id), win_out_fmt)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4d661e44-fd7a-4627-a6c5-ac48198611a8",
   "metadata": {},
   "source": [
    "## Step 1.3. Run the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2fece25e-7c9e-4967-bc9b-489af045e999",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Start the Feldera pipeline.\n",
    "sql.start()\n",
    "\n",
    "# Read profile data from the feature store and write it to the `PROFILE` table.\n",
    "profile_fg = fs.get_or_create_feature_group(\n",
    "    name=\"profile\",\n",
    "    version=1\n",
    ")\n",
    "\n",
    "profile_df = profile_fg.read()\n",
    "\n",
    "sql.input_pandas(\"PROFILES\", profile_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8b012bf9-41b2-422b-888c-323e258fdf70",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Run the pipeline for 60s.\n",
    "import time\n",
    "\n",
    "time.sleep(60)\n",
    "\n",
    "sql.shutdown()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0c4c9fba-a739-4f21-8440-f6ba60e1c553",
   "metadata": {},
   "source": [
    "## Set a materialization job for the feature group in Hopsworks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6e627f6d-bd5c-4022-8b4b-efaf3e9a304b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import datetime\n",
    "\n",
    "# materialize every 10 minutes\n",
    "combined_fg.materialization_job.schedule(cron_expression = \"0 /10 * ? * * *\", start_time=datetime.datetime.now(tz=datetime.timezone.utc))\n",
    "windowed_fg.materialization_job.schedule(cron_expression = \"0 /10 * ? * * *\", start_time=datetime.datetime.now(tz=datetime.timezone.utc))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "44109b66-314f-4311-85de-0fa4508ff920",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "combined_fg.materialization_job.run()\n",
    "windowed_fg.materialization_job.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bd5fc831-f7c5-47b4-ab50-b87831fdf43d",
   "metadata": {},
   "source": [
    "# Great Expectations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "24891711-453e-4b60-9900-f20d266261bb",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "import great_expectations as ge\n",
    "from great_expectations.core import ExpectationSuite, ExpectationConfiguration\n",
    "\n",
    "# Set the expectation suite name to \"transactions_suite\"\n",
    "expectation_suite_transactions = ge.core.ExpectationSuite(\n",
    "    expectation_suite_name=\"transactions_suite\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "189ceb06-fa6e-45ee-ba45-3c7b4116c21e",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Check binary fraud_label column to be in set [0,1]\n",
    "expectation_suite_transactions.add_expectation(\n",
    "    ExpectationConfiguration(\n",
    "        expectation_type=\"expect_column_distinct_values_to_be_in_set\",\n",
    "        kwargs={\n",
    "            \"column\": \"fraud_label\",\n",
    "            \"value_set\": [0, 1],\n",
    "        }\n",
    "    )\n",
    ")\n",
    "\n",
    "# Check amount column to be not negative\n",
    "expectation_suite_transactions.add_expectation(\n",
    "    ExpectationConfiguration(\n",
    "        expectation_type=\"expect_column_values_to_be_between\",\n",
    "        kwargs={\n",
    "            \"column\": \"amount\",\n",
    "            \"min_value\": 0.0,\n",
    "        }\n",
    "    )\n",
    ")\n",
    "\n",
    "# Loop through specified columns ('tid', 'date_time', 'cc_num') and add expectations for null values\n",
    "for column in ['tid', 'date_time', 'cc_num']:\n",
    "    expectation_suite_transactions.add_expectation(\n",
    "        ExpectationConfiguration(\n",
    "            expectation_type=\"expect_column_values_to_be_null\",\n",
    "            kwargs={\n",
    "                \"column\": column,\n",
    "                \"mostly\": 0.0,\n",
    "            }\n",
    "        )\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f5f27685-bfcc-44f5-8520-b11a65180f72",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# update the feature group to store this expectation suite\n",
    "combined_fg.save_expectation_suite(expectation_suite_transactions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "347532dd-f900-4145-ac81-043d0ecd99a0",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Update feature descriptions\n",
    "feature_descriptions = [\n",
    "    {\"name\": \"tid\", \"description\": \"Transaction id\"},\n",
    "    {\"name\": \"date_time\", \"description\": \"Transaction time\"},\n",
    "    {\"name\": \"cc_num\", \"description\": \"Number of the credit card performing the transaction\"},\n",
    "    {\"name\": \"category\", \"description\": \"Expense category\"},\n",
    "    {\"name\": \"amount\", \"description\": \"Dollar amount of the transaction\"},\n",
    "    {\"name\": \"latitude\", \"description\": \"Transaction location latitude\"},\n",
    "    {\"name\": \"longitude\", \"description\": \"Transaction location longitude\"},\n",
    "    {\"name\": \"city\", \"description\": \"City in which the transaction was made\"},\n",
    "    {\"name\": \"country\", \"description\": \"Country in which the transaction was made\"},\n",
    "    {\"name\": \"fraud_label\", \"description\": \"Whether the transaction was fraudulent or not\"},\n",
    "    {\"name\": \"age_at_transaction\", \"description\": \"Age of the card holder when the transaction was made\"},\n",
    "    {\"name\": \"days_until_card_expires\", \"description\": \"Card validity days left when the transaction was made\"},\n",
    "]\n",
    "\n",
    "for desc in feature_descriptions:\n",
    "    combined_fg.update_feature_description(desc[\"name\"], desc[\"description\"])"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
